package com.joysuccess.consumer.snmp.consumer;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Set;

/**
 * 动环实时数据消费者。
 *
 * @author zhangqing
 * @date 2019年05月04日
 */
@Slf4j
@Component
public class DynamicSnmpPointConsumer {
    private final static String DYNAMIC_DATAS_TOPIC = "haode-dynamic-datas";
    @Autowired
    RedisTemplate redisTemplate;
    /**
     * 消费动环消息
     * @param record
     */
    @KafkaListener(topics = DYNAMIC_DATAS_TOPIC)
    public void onMessageRedisString(ConsumerRecord<?, String> record) {
        //1.消费消息到Redis
       JSONObject assetOids =  JSONObject.parseObject(record.value());
       //设备编码
       String assetId = (String) assetOids.get("assetId");
       //设备对应的采集点信息
       String oidList = (String) assetOids.get("oidList");
       //转换为Map格式
       Map<String,String> oidMap =  JSONObject.parseObject(oidList,Map.class);
       //遍历oidMap集合，将数据更新到Redis中
        Set<String> oidKeys = oidMap.keySet();
        for(String oidKey : oidKeys){
            //根据设备编码AssetId : 采集点编码oid : 采集点编码获取的值 ==>实时更新到Redis中
            redisTemplate.opsForHash().put(assetId,oidKey,oidMap.get(oidKey));
        }
    }
}
